package com.xl.competition.old.task1.f1

import org.apache.spark.sql.{Row, SparkSession}

import java.util.Properties

/**
 * @author: xl
 * @createTime: 2023/11/12 17:37:31
 * @program: com.xl.competition
 */
object DataLoadOrders {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    //初始化sparkSession对象
    val sc: SparkSession = SparkSession
      .builder() //利用builder方法构建
      .master("local[*]") //本地模式运行下的线程资源  提交集群时不写
      .appName(this.getClass.getName) //应用名字  方便日志跟踪
      .enableHiveSupport() //支持hive  要写到hive表中
      .config("hive.metastore.uris", "thrift://master:9083") //hive的元数据地址 提交集群可不写但要有hive的环境变量
      .getOrCreate()
    //MYSQL的连接信息
    val properties = new Properties()
    properties.put("user", "root") //用户
    properties.put("password", "Abc123..") //密码

    sc
      .read
      .jdbc("jdbc:mysql://192.168.0.1:3306/competition", "orders", properties)
      .createTempView("tmp_orders") //创建临时视图
    //先在hive中建表 如果hive中已经存在可忽略
    sc.sql(
      """
        |CREATE TABLE if not exists ods.orders
        |(
        |    `orderkey`      int,
        |    `custkey`       int,
        |    `orderstatus`   string,
        |    `totalprice`    decimal,
        |    `orderdate`     string,
        |    `orderpriority` string,
        |    `clerk`         int,
        |    `shippriority`  string
        |) PARTITIONED BY (`dt` STRING)
        |    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
        |        NULL DEFINED AS ''
        |    LOCATION '/warehouse/competition/ods/ods_orders/'
        |""".stripMargin)

    //获取之前orderkey的最大值
    val row: Row = sc.sql(
      """
        |select max(orderkey) from ods.orders limit 1
        |""".stripMargin)
      .take(1)(0)

    //转换为int类型 if是为了防止为空
    var oldKey = 0;
    if (!row.isNullAt(0)) {
      oldKey = row.getInt(0)
    }
    println(oldKey)

    //执行插入语句  从临时视图中读取数据写入到ods.customer中
    //MySQL中我故意将orderdate的类型弄成 "yyyy-MM-dd" 和 "yyyy-MM-dd HH:mm:ss" 两种混合  目的是为了熟悉日期函数的使用
    sc.sql(
      s"""
         |insert into table ods.orders partition (dt = '20231108')
         |select orderkey,
         |       custkey,
         |       orderstatus,
         |       totalprice,
         |       from_unixtime(
         |               nvl(
         |                       unix_timestamp(orderdate, "yyyy-MM-dd"),
         |                       unix_timestamp(orderdate, "yyyy-MM-dd HH:mm:ss")
         |                   ),
         |                  'yyyy-MM-dd') orderdate,
         |       orderpriority,
         |       clerk,
         |       shippriority
         |from tmp_orders
         |where orderkey > ${oldKey}
         |and
         |nvl(unix_timestamp(orderdate,"yyyy-MM-dd"), unix_timestamp(orderdate,"yyyy-MM-dd HH:mm:ss")) >
         |unix_timestamp('2023-11-08','yyyy-MM-dd')
         |""".stripMargin)

    sc.stop()
  }
}
